[SPARK-57001][SS] Hoist isStateful / containsStatefulOperator onto LogicalPlan#56057
[SPARK-57001][SS] Hoist isStateful / containsStatefulOperator onto LogicalPlan#56057HeartSaVioR wants to merge 1 commit into
isStateful / containsStatefulOperator onto LogicalPlan#56057Conversation
…o `LogicalPlan` ### What changes were proposed in this pull request? Introduce two new methods on `LogicalPlan`: - `def isStateful: Boolean = false` -- per-operator declaration of whether the node is a streaming stateful operator (kept across microbatches). - `def containsStatefulOperator: Boolean` -- subtree-level check, memoized. Override `isStateful` on the operators that are streaming stateful: `Aggregate`, `Join` (stream-stream), `GlobalLimit`, `Distinct`, `Deduplicate`, `DeduplicateWithinWatermark`, `FlatMapGroupsWithState`, `FlatMapGroupsInPandasWithState`, `TransformWithState`, `TransformWithStateInPySpark`. ### Why are the changes needed? Several upcoming streaming-side rules (e.g. an optimizer rule that widens `AttributeReference` nullability around stateful operators) need an `isStateful` / `containsStatefulOperator` notion on `LogicalPlan` itself rather than having each rule re-derive the stateful-operator check via pattern matching. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing `UnsupportedOperationCheckerSuite` and streaming test suites cover the behavior preservation. No new tests are added in this commit; subsequent PRs that build on `isStateful` will add targeted tests. ### Was this patch authored or co-authored using generative AI tooling? Yes.
|
cc. @cloud-fan Please take a look, thanks! |
cloud-fan
left a comment
There was a problem hiding this comment.
What this PR does
Adds two methods on LogicalPlan:
def isStateful: Boolean = false— per-node predicate, overridden on the ten logical operators that becomeStateStoreWriters at execution (Aggregate, stream-streamJoin,GlobalLimit,Distinct,Deduplicate,DeduplicateWithinWatermark,FlatMapGroupsWithState,FlatMapGroupsInPandasWithState,TransformWithState,TransformWithStateInPySpark), each gated onchild.isStreaming(orleft.isStreaming && right.isStreamingforJoin).def containsStatefulOperator: Boolean— memoized subtree-level OR, backed by aprivate[this] lazy val.
Design notes
The two-method shape is well-motivated: consumers ask two distinct questions. The per-node question ("is this the stateful op?") is what lets plan.foreach { sub => if (sub.isStateful) ... } and plan.collect { case p if p.isStateful => p } replace ten-arm pattern matches. The subtree question ("does this plan contain any?") is what MicroBatchExecution.disableAQESupportInStatelessIfUnappropriated and SequentialUnionAnalysis care about. isStreaming only ever needs the subtree question (only leaf relations introduce the property), which is why one method suffices there.
Coverage check against the physical StateStoreWriter operators (StreamingGlobalLimitExec, StateStoreSaveExec / SessionWindowStateStoreSaveExec from Aggregate, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingSymmetricHashJoinExec, FlatMapGroupsWithStateExec, TransformWithStateExec(InPySpark)): the override list is complete. StreamingLocalLimitExec is correctly excluded (not a StateStoreWriter); UpdateEventTimeColumnExec is correctly excluded (not stateful).
One concern worth addressing in this PR
The new isStateful set matches MicroBatchExecution.containsStatefulOperator exactly, but diverges from UnsupportedOperationChecker.isStatefulOperation in two places: Deduplicate counts as stateful here regardless of whether its keys carry an event-time column, and streaming GlobalLimit is included here but not there. Those two checks aren't really competing — isStatefulOperation is scoped to the chained-watermark-correctness analysis ("ops that can emit late rows"), while the PR's isStateful is the broader runtime "uses a StateStoreWriter" view — but the PR description ("currently we ask each rule to re-derive the stateful-operator check via pattern matching") implies this should replace such pattern matches, which would include isStatefulOperation. Migrating those callers blindly would be a silent semantic change.
The fix is documentation: pin down in the Scaladoc what question this API answers (runtime / StateStoreWriter view) and explicitly note that isStatefulOperation answers a narrower question and is not a straightforward callee for replacement. See the inline comment on LogicalPlan.scala.
Follow-ups (non-blocking)
MicroBatchExecution.containsStatefulOperator(the private def in the AQE-disable check, around line 527) is now exactly equivalent toanalyzedPlan.containsStatefulOperator— natural cleanup in a follow-up.- A small unit test (batch plan → false; streaming aggregate/dedup/limit/join/etc. →
containsStatefulOperator == true; memoization fires once) would help guard the future migrations.
| /** Marks if a streaming node is a stateful operator. */ | ||
| def isStateful: Boolean = false | ||
|
|
||
| /** Marks if a subplan contains a stateful operator. */ |
There was a problem hiding this comment.
Two suggestions for the Scaladoc:
-
"Marks if" is awkward — these return a boolean rather than marking anything. "Whether …" or "Returns true if …" is more conventional. For
containsStatefulOperator, please also say it includesthis(the body readsisStateful || children.exists(...)). -
More substantively, please nail down what "stateful" means here. The new definition is the streaming-runtime view (any operator that becomes a
StateStoreWriterat execution) and matchesMicroBatchExecution.containsStatefulOperatorexactly. It diverges fromUnsupportedOperationChecker.isStatefulOperationon two operators:Deduplicateis stateful here regardless of whether keys carry an event-time column, and streamingGlobalLimitis included here but not there. Calling that out — and noting thatisStatefulOperationis intentionally narrower (scoped to the chained-watermark correctness check) and isn't a drop-in replacement target — will keep future PRs from silently swapping callers and changing analyzer semantics. Worth naming which existing checks are intended replacement targets, too.
| final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): Distinct = | ||
| copy(child = newChild) | ||
| override def isStateful: Boolean = child.isStreaming |
There was a problem hiding this comment.
This override is non-obvious at the Distinct layer — Distinct doesn't directly become a StateStoreWriter. The existing comment in UnsupportedOperationChecker.isStatefulOperation explains it: "Since the Distinct node will be replaced to Aggregate in the optimizer rule ReplaceDistinctWithAggregate, here we also need to check all Distinct node by assuming it as Aggregate." Worth preserving that rationale here, or at least a // see ReplaceDistinctWithAggregate one-liner.
What changes were proposed in this pull request?
Introduce two new methods on
LogicalPlan:def isStateful: Boolean = false-- per-operator declaration of whether the node is a streaming stateful operator (kept across microbatches).def containsStatefulOperator: Boolean-- subtree-level check, memoized.Override
isStatefulon the operators that are streaming stateful:Aggregate,Join(stream-stream),GlobalLimit,Distinct,Deduplicate,DeduplicateWithinWatermark,FlatMapGroupsWithState,FlatMapGroupsInPandasWithState,TransformWithState,TransformWithStateInPySpark.Why are the changes needed?
This will be used as a convenient utility for future works. Currently we ask each rule to re-derive the stateful-operator check via pattern matching.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
N/A.
Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude 4.6 Opus